package com.dahuan.tables;

import com.dahuan.bean.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class Table_Over_Window {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );
        //指定事件时间
        env.setStreamTimeCharacteristic( TimeCharacteristic.EventTime );

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create( env );

        // 2. 读入文件数据，得到DataStream
        DataStream<String> inputStream = env.readTextFile( "E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\sensor.txt" );

        // 3. 转换成POJO
        DataStream<SensorReading> dataStream = inputStream.map( line -> {
            String[] fields = line.split( "," );
            return new SensorReading( fields[0], new Long( fields[1] ), new Double( fields[2] ) );
        } )
                .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<SensorReading>( Time.seconds( 2 ) ) {
                    @Override
                    public long extractTimestamp(SensorReading element) {
                        return element.getTimestamp() * 1000L;
                    }
                } );

        // 4. 将流转换成表，定义时间特性
        Table dataTable = tableEnv.fromDataStream( dataStream, "id, timestamp as ts, temperature as temp, rt.rowtime" );

        // 5.窗口操作 (Over)
        // 5.1 table API
        //TODO 按照id分区,按时间排序,
        Table overTable = dataTable.window( Over.partitionBy( "id" ).orderBy( "rt" ).preceding( "2.rows" ).as( "ow" ) )
                .select( "id,rt,id.count over ow,temp.avg over ow" );

        tableEnv.createTemporaryView( "sensor", dataTable );
        // 5.2 SQL
       // String sql = "";

        Table overSQL = tableEnv.sqlQuery("select id, rt, count(id) over ow, avg(temp) over ow " +
                " from sensor " +
                //TODO 以下为固定写法
                " window ow as (partition by id order by rt rows between 2 preceding and current row)");

        tableEnv.toAppendStream( overTable, Row.class ).print("Table");
        tableEnv.toAppendStream( overSQL,Row.class ).print("SQL");

        env.execute( "Table_Over_Window" );
    }
}
